package com.boot.kafka.jpa.elasticsearch.service.impl;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.boot.base.common.utils.JsonUtil;
import com.boot.kafka.jpa.elasticsearch.domain.EsEntity;
import com.boot.kafka.jpa.elasticsearch.service.ElasticsearchService;
import com.boot.base.common.utils.ErrorUtils;
 import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.core.CountRequest;
import org.elasticsearch.client.core.CountResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.function.Consumer;

/**
 * @author weicy4
 * @since 2020/0820
 */
//@Slf4j
@Service
public class ElasticsearchServiceImpl implements ElasticsearchService {
    protected final Logger log = LoggerFactory.getLogger(this.getClass());

    @Autowired
    RestHighLevelClient restHighLevelClient;

    /**
     * Description: 判断某个index是否存在
     *
     * @param index index名
     * @return boolean
     */
    @Override
    public boolean indexExist(String index) throws Exception {
        GetIndexRequest request = new GetIndexRequest(index);
        request.local(false);
        request.humanReadable(true);
        request.includeDefaults(false);
        return restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
    }

    /**
     * Description: 插入/更新一条记录
     *
     * @param index  index
     * @param entity 对象
     */
    @Override
    public void insertOrUpdateOne(String index, EsEntity entity) {
        IndexRequest request = new IndexRequest(index);
        request.id(entity.getId());
        if(entity.getData() instanceof  String){
            request.source((String) entity.getData(), XContentType.JSON);
        }else{
            request.source(JsonUtil.objectToJson(entity.getData()), XContentType.JSON);
        }

        try {
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Description: 批量插入数据
     *
     * @param index index
     * @param list  带插入列表
     */
    @Override
    public void insertBatch(String index, List<EsEntity> list) {
        BulkRequest request = new BulkRequest();
        list.forEach(item -> {
            request.add(
                    new IndexRequest(index)
                    .id(item.getId())
                    .source(JsonUtil.objectToJson(item.getData()), XContentType.JSON)
            );
        });
        try {
            restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Description: 批量删除
     *
     * @param index  index
     * @param idList 待删除列表
     */
    @Override
    public <T> void deleteBatch(String index, Collection<T> idList) {
        BulkRequest request = new BulkRequest();
        idList.forEach(item -> request.add(new DeleteRequest(index, item.toString())));
        try {
            restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * Description: 搜索
     *
     * @param index   index
     * @param builder 查询参数
     * @param c       结果类对象
     * @return java.util.ArrayList
     */
    @Override
    public <T> List<T> search(String index, SearchSourceBuilder builder, Class<T> c) {
        SearchRequest request = new SearchRequest(index);
        request.source(builder);
        try {
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            Long total = response.getHits().getTotalHits().value;
            log.info("total " + total);

            List<T> res = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                res.add(JsonUtil.jsonToObject(hit.getSourceAsString(), c));
            }
            return res;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public <T> List<SearchHit> queryPage(String[] indexs, SearchSourceBuilder builder) {
        SearchRequest request = new SearchRequest(indexs);
        request.source(builder);
        try {
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
           return Arrays.asList(response.getHits().getHits());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public <T> IPage<T> searchPage(String index, SearchSourceBuilder builder, Class<T> c, Page<T> page) {

        SearchRequest request = new SearchRequest(index);
        // 分页查询 from 从0开始计数 前端传递来的page起始值为1 转化的current应该从0开始计数
        int from = (int) ((page.getCurrent() - 1) * page.getSize());
        from = Math.max(from, 0);
        int size = (int) page.getSize();
        size = (size < 0) ? 10 : size;
        // 分页
        builder.from(from);
        builder.size(size);
        builder.trackTotalHits(true);
        request.source(builder);
        try {
            SearchResponse response = restHighLevelClient.search(request, RequestOptions.DEFAULT);
            SearchHit[] hits = response.getHits().getHits();
            Long total = response.getHits().getTotalHits().value;
            page.setTotal(total);

            List<T> res = new ArrayList<>(hits.length);
            for (SearchHit hit : hits) {
                res.add(JsonUtil.jsonToObject(hit.getSourceAsString(), c));
            }
            page.setRecords(res);
            return page;
        } catch (ElasticsearchStatusException e) {
            RestStatus not_found = RestStatus.valueOf("NOT_FOUND");
            if (e.status().equals(not_found)) {
                log.warn("es查询错误,索引未找到，有可能是清空索引导致的错误，可以忽略。{}", ErrorUtils.getErrorMsg(e));
                page.setTotal(0);
                page.setRecords(new ArrayList<>());
                return page;
            }
            e.printStackTrace();
            log.error("es查询错误{}", ErrorUtils.getErrorMsg(e));
            throw new RuntimeException(e);

        } catch (Exception e) {
            e.printStackTrace();
            log.error("es查询错误{}", ErrorUtils.getErrorMsg(e));
            throw new RuntimeException(e);
        }
    }

    /**
     * Description: 删除index
     *
     * @param index index
     * @return void
     */
    @Override
    public void deleteIndex(String index) {
        try {
            restHighLevelClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }


    /**
     * Description: delete by query
     *
     * @param index   index
     * @param builder builder
     */
    @Override
    public void deleteByQuery(String index, QueryBuilder builder) {
        DeleteByQueryRequest request = new DeleteByQueryRequest(index);
        request.setQuery(builder);
        //设置批量操作数量,最大为10000
        request.setBatchSize(10000);
        request.setConflicts("proceed");
        try {
            restHighLevelClient.deleteByQuery(request, RequestOptions.DEFAULT);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public long getCountByIndex(String index) {
        CountRequest countRequest = new CountRequest();
        countRequest.indices(index);
        try {
            //判断当前索引下是否存在数据
            if (!indexExist(index)) {
                return 0;
            }
            CountResponse response = restHighLevelClient.count(countRequest, RequestOptions.DEFAULT);
            return response.getCount();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("查询es中索引{}总记录数时发送异常：{}", index, ErrorUtils.getErrorMsg(e));
            return 0;
        }
    }


    @Override
    public void scroll(String index, Long scrollTimeOut, int batchCount, Consumer<SearchHit[]> consumer) throws Exception {
        //判断索引是否存在
        if(!indexExist(index)){
            return ;
        }
        Scroll scroll = new Scroll(TimeValue.timeValueMillis(scrollTimeOut));
        // 查询参数
        SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(500) ;
        SearchRequest searchRequest = new SearchRequest(index);
        searchRequest.scroll(scroll);
        searchRequest.source(sourceBuilder);

        SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
        String scrollId = searchResponse.getScrollId();
        SearchHit[] hits = searchResponse.getHits().getHits();
        while (hits!=null && hits.length > 0) {
            // 处理查询结果
            consumer.accept(hits);
            // 循环查询
            SearchScrollRequest searchScrollRequest = new SearchScrollRequest(scrollId);
            searchScrollRequest.scroll(scroll);
            SearchResponse searchScrollResponse = restHighLevelClient.scroll(searchScrollRequest, RequestOptions.DEFAULT);
            scrollId = searchScrollResponse.getScrollId();
            hits = searchScrollResponse.getHits().getHits();
        }
        //及时清除es快照，释放资源
        ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
        clearScrollRequest.addScrollId(scrollId);
        restHighLevelClient.clearScroll(clearScrollRequest, RequestOptions.DEFAULT);
    }

}
